package com.huan.flink.sink.localfile;

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;
import java.time.ZoneId;

/**
 * flink 输出文件到本地系统<br/>
 *
 * <a href="https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/filesystem/">参考文档</a>
 *
 * @author huan.fu
 * @date 2023/9/23 - 21:00
 */
public class LocalFileSinkApplication {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        // 每个目录中，都有 并行度个数的 文件在写入
        environment.setParallelism(2);
        // 必须开启checkpoint，否则一直都是 .inprogress
        environment.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

        DataStreamSource<String> ds = environment.fromElements("hello java", "hello flink", "你好 flink");

        // 输出到文件系统
        FileSink<String> fieSink = FileSink
                // 输出行式存储的文件，指定路径、指定编码
                .<String>forRowFormat(new Path("/Users/huan/code/IdeaProjects/me/spring-cloud-parent/flink/flink-sink/flink-sink-file/logs"), new SimpleStringEncoder<>("UTF-8"))
                // 输出文件的一些配置： 文件名的前缀、后缀
                .withOutputFileConfig(
                        OutputFileConfig.builder()
                                // 文件前缀
                                .withPartPrefix("flink-file-sink-")
                                // 文件后缀
                                .withPartSuffix(".log")
                                .build()
                )
                // 按照目录分桶：如下，就是每个小时一个目录
                .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
                // 文件滚动策略:  1分钟 或 1k 产生一个文件
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(1))
                                .withMaxPartSize(new MemorySize(1024))
                                .build()
                )
                .build();

        ds
                // 随机分区
                .shuffle()
                // 输出到文件中
                .sinkTo(fieSink);

        environment.execute("sink lock file job");
    }
}
